/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.raysdata.atlas.mysql.client;

import com.google.common.annotations.VisibleForTesting;
import com.google.gson.Gson;
import com.raysdata.atlas.common.AtlasUtil;
import com.raysdata.atlas.common.DataSource;

import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.model.discovery.AtlasSearchResult;
import org.apache.atlas.model.instance.*;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.commons.cli.ParseException;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springblade.metadata.entity.MetadataVersion;
import org.springblade.metadata.feign.IMetadataVersionClient;

import java.sql.*;
import java.time.LocalDateTime;
import java.util.*;

import static java.util.Arrays.asList;
import static org.apache.atlas.type.AtlasTypeUtil.toAtlasRelatedObjectId;
import static org.apache.atlas.type.AtlasTypeUtil.toAtlasRelatedObjectIds;


/**
 * A Bridge Utility that imports metadata from the Hive Meta Store and registers
 * them in Atlas.
 */

public class MysqlMetaStoreBridge {
	private static final Logger LOG = LoggerFactory.getLogger(MysqlMetaStoreBridge.class);

	public static final String QUALIFIED_NAME               = "qualifiedName";
	public static final String REFERENCEABLE_ATTRIBUTE_NAME = QUALIFIED_NAME;
	public static final String CLUSTER_SUFFIX              = "@cl1";
	public static final String DATABASE_TYPE               = "DB";
	public static final String MANAGED_TABLE               = "Managed";
	public static final String FACT_CLASSIFICATION         = "Fact";
	public static final String TABLE_TYPE                  = "Table";
	public static final String COLUMN_TYPE                 = "Column";
	public static final String STORAGE_DESC_TYPE           = "StorageDesc";
	private static int pageLimit = 10000;

	private AtlasClientV2 atlasClientV2;
	private MysqlClientModel model;
	private IMetadataVersionClient iMetadataVersionClient;
	private DataSource dataSource;
	public MysqlMetaStoreBridge(AtlasClientV2 atlasClientV2, MysqlClientModel model, IMetadataVersionClient iMetadataVersionClient,DataSource dataSource) throws InstantiationException, IllegalAccessException, ClassNotFoundException, SQLException {
		this.atlasClientV2 = atlasClientV2;
		this.model = model;
		this.iMetadataVersionClient = iMetadataVersionClient;
		this.dataSource = dataSource;
		//this.metadataNamespace = model.getMetadataNamespace();
        //this.dataSource = dataSource;
		//this.databaseToImport = dataSource.getDatabase();
		//this.tableToImport = model.getTableToImport();
		//initMysqlConnect(dataSource);
	}
	public MysqlMetaStoreBridge(MysqlClientModel model){
		this.model = model;
	}

	public void syn() {

		try {

			importMysqlMetadata();

		} catch (ParseException e) {
			LOG.error("Failed to parse arguments. Error: ", e.getMessage());
		} catch (Exception e) {
			LOG.error("Import failed", e);
		} finally {
            if( atlasClientV2 !=null) {
                atlasClientV2.close();
            }
		}

	}

	/**
	 * Construct a HiveMetaStoreBridge.
	 * 
	 * @param hiveConf {@link } for Hive component in the cluster
	 */
//    public MysqlMetaStoreBridge(Configuration atlasProperties,  AtlasClientV2 atlasClientV2) throws Exception {
//        this.metadataNamespace          = getMetadataNamespace(atlasProperties);
//        this.hiveClient                 = Hive.get(hiveConf);
//        this.atlasClientV2              = atlasClientV2;
//        this.convertHdfsPathToLowerCase = atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, false);
//        this.awsS3AtlasModelVersion     = atlasProperties.getString(HOOK_AWS_S3_ATLAS_MODEL_VERSION, HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2);
//        if (atlasProperties != null) {
//            pageLimit = atlasProperties.getInteger(HOOK_HIVE_PAGE_LIMIT, 10000);
//        }
//    }

	private Connection conn = null;

	void initMysqlConnect(DataSource dataSource)
			throws InstantiationException, IllegalAccessException, ClassNotFoundException, SQLException {
		Properties connectionProps = new Properties();
		connectionProps.put("user", dataSource.getUserName());
		connectionProps.put("password", dataSource.getPassword());
		Class.forName("com.mysql.jdbc.Driver").newInstance();
		conn = (Connection) DriverManager.getConnection(dataSource.getHost(), connectionProps);
	}

	void initMysqlConnect1()
			throws InstantiationException, IllegalAccessException, ClassNotFoundException, SQLException {
		Properties connectionProps = new Properties();
		connectionProps.put("user", this.model.getUser());
		connectionProps.put("password", this.model.getPassword());
		Class.forName("com.mysql.cj.jdbc.Driver").newInstance();
		conn = (Connection) DriverManager.getConnection(this.model.getJdbcUrl(), connectionProps);
	}

	public void closeConnect() throws SQLException {
		if (this.conn != null) {
			this.conn.close();
		}
	}

	public List<String> getAllDataBases() {
		Statement stmt = null;
		ResultSet resultset = null;
		List<String> dbs = new ArrayList<>();
		try {
			stmt = conn.createStatement();
			resultset = stmt.executeQuery("SHOW DATABASES;");

			if (stmt.execute("SHOW DATABASES;")) {
				resultset = stmt.getResultSet();
			}

			while (resultset.next()) {
				System.out.println(resultset.getString("Database"));
				dbs.add(resultset.getString("Database"));
			}
		} catch (SQLException ex) {
			// handle any errors
			ex.printStackTrace();
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} finally {
			// release resources
			if (resultset != null) {
				try {
					resultset.close();
				} catch (SQLException sqlEx) {
				}
				resultset = null;
			}

			if (stmt != null) {
				try {
					stmt.close();
				} catch (SQLException sqlEx) {
				}
				stmt = null;
			}
			if (conn != null) {
				try {
					conn.close();
				} catch (SQLException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
				conn = null;
			}
		}
		return dbs;
	}

	void desctroy() {

	}


	@VisibleForTesting
	public void importMysqlMetadata() throws Exception {
		LOG.info("Importing Mysql metadata");

		importDatabases();
	}

	private void importDatabases() throws Exception {
		List<String> databaseNames = new ArrayList<>();

		databaseNames.add(this.model.getDatabaseToImport());

		if (!CollectionUtils.isEmpty(databaseNames)) {
			LOG.info("Found {} databases", databaseNames.size());

			for (String databaseName : databaseNames) {
				AtlasEntity dbEntity = createDatabase(databaseName, "fxp database", "Fxp", "hdfs://host:8000/apps/warehouse/sales");

				if (dbEntity != null) {
					importTables(dbEntity,databaseName, this.model.isFailOnError());
				}
			}
		} else {
			LOG.info("No database found");
		}
	}
	/*private void importDatabases(String databaseToImport, String tableToImport) throws Exception {
		List<String> databaseNames = null;

		databaseNames = getAllDataBases();

		if (!CollectionUtils.isEmpty(databaseNames)) {
			LOG.info("Found {} databases", databaseNames.size());

			for (String databaseName : databaseNames) {
				if(databaseName.equals(this.databaseToImport)) {
					AtlasEntityWithExtInfo dbEntity = registerDatabase(databaseName);

					if (dbEntity != null) {
						//importTables(dbEntity.getEntity(), databaseName, tableToImport, this.model.isFailOnError());
					}
				}

			}
		} else {
			LOG.info("No database found");
		}
	}*/

	public List<String> getAllTables() {

		try {
			initMysqlConnect1();
		} catch (InstantiationException | IllegalAccessException | ClassNotFoundException | SQLException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		List<String> schemas = new ArrayList<>();

		Statement stmt = null;
		ResultSet rs = null;
		try {
			stmt = conn.createStatement();
			// Retrieving the data
			rs = stmt.executeQuery("Show tables");
			System.out.println("Tables in the current database: ");
			while (rs.next()) {
				String tableName = rs.getString(1);
				schemas.add(tableName);
			}
		} catch (SQLException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} finally {
			// release resources
			if (rs != null) {
				try {
					rs.close();
				} catch (SQLException sqlEx) {
				}
				rs = null;
			}

			if (stmt != null) {
				try {
					stmt.close();
				} catch (SQLException sqlEx) {
				}
				stmt = null;
			}
		}

		return schemas;
	}

	public List<String> getColumns(String tableName) {

		try {
			initMysqlConnect1();
		} catch (InstantiationException | IllegalAccessException | ClassNotFoundException | SQLException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		List<String> columns = new ArrayList<>();

		Statement stmt = null;
		ResultSet rs = null;
		try {
			stmt = conn.createStatement();
			// Retrieving the data
			rs = stmt.executeQuery("SELECT * FROM " + tableName + " where 1=0");
			ResultSetMetaData metaData = rs.getMetaData();

			int columnCount = metaData.getColumnCount();
			for (int i = 1; i <= columnCount; i++) {
				String columnName = metaData.getColumnName(i);
				columns.add(columnName);
			}
		} catch (SQLException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} finally {
			// release resources
			if (rs != null) {
				try {
					rs.close();
				} catch (SQLException sqlEx) {
				}
				rs = null;
			}

			if (stmt != null) {
				try {
					stmt.close();
				} catch (SQLException sqlEx) {
				}
				stmt = null;
			}
		}

		return columns;
	}


	/**
	 * Imports all tables for the given db
	 * 
	 //* @param dbEntity
	 * @param failOnError
	 * @throws Exception
	 */
	private int importTables(AtlasEntity dbEntity,String databaseName, final boolean failOnError)
			throws Exception {
		int tablesImported = 0;

		final List<String> tableNames = getAllTables();

		if (!CollectionUtils.isEmpty(tableNames)) {
			LOG.info("Found {} tables to import in database {}", tableNames.size(), this.model.getDatabaseToImport());

			try {
				for (String tableName : tableNames) {
					int imported = importTable(dbEntity,databaseName, tableName, failOnError);

					tablesImported += imported;
				}
			} finally {
				if (tablesImported == tableNames.size()) {
					LOG.info("Successfully imported {} tables from database {}", tablesImported, this.model.getDatabaseToImport());
				} else {
					LOG.error("Imported {} of {} tables from database {}. Please check logs for errors during import",
							tablesImported, tableNames.size(), this.model.getDatabaseToImport());
				}
			}
		} else {
			LOG.info("No tables to import in database {}", this.model.getDatabaseToImport());
		}

		return tablesImported;
	}

	@VisibleForTesting
	public int importTable(AtlasEntity dbEntity,String databaseName, String tableName, final boolean failOnError)
			throws Exception {
		List<String> columnsList = getColumns(tableName);
		List<AtlasEntity> colAtlasEntity = new ArrayList<>();
		for (String col:columnsList){
			colAtlasEntity.add(createColumn(databaseName, tableName, col, "int", "time id"));
		}
		AtlasEntityWithExtInfo ret = findTable(tableName);
		if (null !=ret){
			String versionJson = new Gson().toJson(ret);
			String guid = ret.getEntity().getGuid();
			MetadataVersion metadataVersion = new MetadataVersion();
			metadataVersion.setContentJson(versionJson);
			metadataVersion.setMetadataId(guid);
			metadataVersion.setPublisher("元数据采集任务");
			metadataVersion.setCreateTime(LocalDateTime.now());
			iMetadataVersionClient.saveMetadataVersion(metadataVersion);
		}

		createTable(tableName, "sales fact table", dbEntity, "Fxp", MANAGED_TABLE, colAtlasEntity, FACT_CLASSIFICATION);

		return 1;
	}

	AtlasEntity createDatabase(String databaseName, String description, String owner, String locationUri, String... classificationNames) throws Exception {
		AtlasEntity entity = new AtlasEntity(DATABASE_TYPE);

		AtlasEntityWithExtInfo ret = null;
		ret = findDatabase(databaseName);
		
		AtlasUtil.addDateSourId(entity, this.dataSource.getId());
		if (ret == null) {
			// set attributes
			entity.setAttribute("name", databaseName);
			entity.setAttribute(REFERENCEABLE_ATTRIBUTE_NAME, databaseName + CLUSTER_SUFFIX);
			entity.setAttribute("description", description);
			entity.setAttribute("owner", owner);
			entity.setAttribute("locationuri", locationUri);
			entity.setAttribute("createTime", System.currentTimeMillis());
			// set classifications
			entity.setClassifications(toAtlasClassifications(classificationNames));
			return createInstance(entity);
		} else {
			LOG.info("Database {} is already registered - id={}. Updating it.", databaseName, ret.getEntity().getGuid());
			entity = ret.getEntity();
			AtlasUtil.addDateSourId(entity, this.dataSource.getId());
			entity.setAttribute("name", databaseName);
			entity.setAttribute(REFERENCEABLE_ATTRIBUTE_NAME, databaseName + CLUSTER_SUFFIX);
			entity.setAttribute("description", description);
			entity.setAttribute("owner", owner);
			entity.setAttribute("locationuri", locationUri);
			entity.setAttribute("createTime", System.currentTimeMillis());
			// set classifications
			entity.setClassifications(toAtlasClassifications(classificationNames));
			ret.setEntity(entity);

			updateInstance(ret);
		}
		return ret.getEntity();

	}

	private void updateInstance(AtlasEntityWithExtInfo entity) throws AtlasServiceException {
		if (LOG.isDebugEnabled()) {
			LOG.debug("updating {} entity: {}", entity.getEntity().getTypeName(), entity);
		}

		atlasClientV2.updateEntity(entity);

		LOG.info("Updated {} entity: name={}, guid={}", entity.getEntity().getTypeName(), entity.getEntity().getAttribute(REFERENCEABLE_ATTRIBUTE_NAME), entity.getEntity().getGuid());
	}


	private AtlasEntity createInstance(AtlasEntity entity) throws Exception {
		return createInstance(new AtlasEntityWithExtInfo(entity));
	}

	private AtlasEntity createInstance(AtlasEntityWithExtInfo entityWithExtInfo) throws Exception {
		AtlasEntity             ret      = null;
		EntityMutationResponse  response = atlasClientV2.createEntity(entityWithExtInfo);
		List<AtlasEntityHeader> entities = response.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE);

		if (CollectionUtils.isNotEmpty(entities)) {
			AtlasEntityWithExtInfo getByGuidResponse = atlasClientV2.getEntityByGuid(entities.get(0).getGuid());

			ret = getByGuidResponse.getEntity();

			System.out.println("Created entity of type [" + ret.getTypeName() + "], guid: " + ret.getGuid());
		}

		return ret;
	}

	private List<AtlasClassification> toAtlasClassifications(String[] classificationNames) {
		List<AtlasClassification> ret             = new ArrayList<>();
		List<String>              classifications = asList(classificationNames);

		if (CollectionUtils.isNotEmpty(classifications)) {
			for (String classificationName : classifications) {
				ret.add(new AtlasClassification(classificationName));
			}
		}

		return ret;
	}

	AtlasEntity createTable(String name, String description, AtlasEntity database, String owner, String tableType,
							List<AtlasEntity> columns, String... classificationNames) throws Exception {
		AtlasEntity tblEntity = new AtlasEntity(TABLE_TYPE);

		// set attributes
		tblEntity.setAttribute("name", name);
		tblEntity.setAttribute(REFERENCEABLE_ATTRIBUTE_NAME, name + CLUSTER_SUFFIX);
		tblEntity.setAttribute("description", description);
		tblEntity.setAttribute("owner", owner);
		tblEntity.setAttribute("tableType", tableType);
		tblEntity.setAttribute("createTime", System.currentTimeMillis());
		tblEntity.setAttribute("lastAccessTime", System.currentTimeMillis());
		tblEntity.setAttribute("retention", System.currentTimeMillis());

		// set relationship attributes
		AtlasEntity storageDesc = createStorageDescriptor("hdfs://host:8000/apps/warehouse/sales", "TextInputFormat", "TextOutputFormat", true);
		storageDesc.setRelationshipAttribute("table", toAtlasRelatedObjectId(tblEntity));

		tblEntity.setRelationshipAttribute("db", toAtlasRelatedObjectId(database));
		tblEntity.setRelationshipAttribute("sd", toAtlasRelatedObjectId(storageDesc));
		tblEntity.setRelationshipAttribute("columns", toAtlasRelatedObjectIds(columns));

		// set classifications
		tblEntity.setClassifications(toAtlasClassifications(classificationNames));

		AtlasEntityWithExtInfo entityWithExtInfo = new AtlasEntityWithExtInfo();

		entityWithExtInfo.setEntity(tblEntity);
		entityWithExtInfo.addReferredEntity(storageDesc);

		for (AtlasEntity column : columns) {
			column.setRelationshipAttribute("table", toAtlasRelatedObjectId(tblEntity));

			entityWithExtInfo.addReferredEntity(column);
		}

		return createInstance(entityWithExtInfo);
	}

	AtlasEntity createStorageDescriptor(String location, String inputFormat, String outputFormat, boolean compressed) {
		AtlasEntity ret = new AtlasEntity(STORAGE_DESC_TYPE);

		ret.setAttribute("name", "sd:" + location);
		ret.setAttribute(REFERENCEABLE_ATTRIBUTE_NAME, "sd:" + location + CLUSTER_SUFFIX);
		ret.setAttribute("location", location);
		ret.setAttribute("inputFormat", inputFormat);
		ret.setAttribute("outputFormat", outputFormat);
		ret.setAttribute("compressed", compressed);

		return ret;
	}

	AtlasEntity createColumn(String databaseName, String tableName, String columnName, String dataType, String comment, String... classificationNames) {
		AtlasEntity ret = new AtlasEntity(COLUMN_TYPE);

		// set attributes
		ret.setAttribute("name", columnName);
		ret.setAttribute(REFERENCEABLE_ATTRIBUTE_NAME, databaseName + "." + tableName + "." + columnName + CLUSTER_SUFFIX);
		ret.setAttribute("dataType", dataType);
		ret.setAttribute("comment", comment);

		// set classifications
		ret.setClassifications(toAtlasClassifications(classificationNames));

		return ret;
	}

	private String lower(String str) {
		if (StringUtils.isEmpty(str)) {
			return "";
		}

		return str.toLowerCase().trim();
	}



	/**
	 * Construct the qualified name used to uniquely identify a Table instance in
	 * Atlas.
	 * 
	 * @param metadataNamespace Name of the cluster to which the Hive component
	 *                          belongs
	 * @param dbName            Name of the Hive database to which the Table belongs
	 * @param tableName         Name of the Hive table
	 * @param isTemporaryTable  is this a temporary table
	 * @return Unique qualified name to identify the Table instance in Atlas.
	 */
	public static String getTableQualifiedName(String metadataNamespace, String dbName, String tableName,
			boolean isTemporaryTable) {
		String tableTempName = tableName;

		return String.format("%s.%s@%s", dbName.toLowerCase(), tableTempName.toLowerCase(), metadataNamespace);
	}


	/**
	 * Construct the qualified name used to uniquely identify a Table instance in
	 * Atlas.
	 *
	 * @param metadataNamespace Metadata namespace of the cluster to which the Hive
	 *                          component belongs
	 * @param dbName            Name of the Hive database to which the Table belongs
	 * @param tableName         Name of the Hive table
	 * @return Unique qualified name to identify the Table instance in Atlas.
	 */
	public static String getTableQualifiedName(String metadataNamespace, String dbName, String tableName) {
		return getTableQualifiedName(metadataNamespace, dbName, tableName, false);
	}

	public static String getStorageDescQFName(String tableQualifiedName) {
		return tableQualifiedName + "_storage";
	}

	public static String getColumnQualifiedName(final String tableQualifiedName, final String colName) {
		final String[] parts = tableQualifiedName.split("@");
		final String tableName = parts[0];
		final String metadataNamespace = parts[1];

		return String.format("%s.%s@%s", tableName, colName.toLowerCase(), metadataNamespace);
	}

	public static long getTableCreatedTime() {
		return 0;
	}

	private void clearRelationshipAttributes(AtlasEntitiesWithExtInfo entities) {
		if (entities != null) {
			if (entities.getEntities() != null) {
				for (AtlasEntity entity : entities.getEntities()) {
					clearRelationshipAttributes(entity);
					;
				}
			}

			if (entities.getReferredEntities() != null) {
				clearRelationshipAttributes(entities.getReferredEntities().values());
			}
		}
	}

	private void clearRelationshipAttributes(AtlasEntityWithExtInfo entity) {
		if (entity != null) {
			clearRelationshipAttributes(entity.getEntity());

			if (entity.getReferredEntities() != null) {
				clearRelationshipAttributes(entity.getReferredEntities().values());
			}
		}
	}

	private void clearRelationshipAttributes(Collection<AtlasEntity> entities) {
		if (entities != null) {
			for (AtlasEntity entity : entities) {
				clearRelationshipAttributes(entity);
			}
		}
	}

	private void clearRelationshipAttributes(AtlasEntity entity) {
		if (entity != null && entity.getRelationshipAttributes() != null) {
			entity.getRelationshipAttributes().clear();
		}
	}

	private boolean isTableWithDatabaseName(String tableName) {
		boolean ret = false;
		if (tableName.contains(".")) {
			ret = true;
		}
		return ret;
	}

	private List<AtlasEntityHeader> getAllDatabaseInCluster() throws AtlasServiceException {

		List<AtlasEntityHeader> entities = new ArrayList<>();

		return entities;
	}

	private List<AtlasEntityHeader> getAllTablesInDb(String databaseGuid) throws AtlasServiceException {

		List<AtlasEntityHeader> entities = new ArrayList<>();
		final int pageSize = pageLimit;

//		for (int i = 0;; i++) {
//			int offset = pageSize * i;
//			LOG.info("Retrieving tables: offset={}, pageSize={}", offset, pageSize);
//
//			AtlasSearchResult searchResult = atlasClientV2.relationshipSearch(databaseGuid, HIVE_TABLE_DB_EDGE_LABEL,
//					null, null, true, pageSize, offset);
//
//			List<AtlasEntityHeader> entityHeaders = searchResult == null ? null : searchResult.getEntities();
//			int tableCount = entityHeaders == null ? 0 : entityHeaders.size();
//
//			LOG.info("Retrieved {} tables of {} database", tableCount, databaseGuid);
//
//			if (tableCount > 0) {
//				entities.addAll(entityHeaders);
//			}
//
//			if (tableCount < pageSize) { // last page
//				break;
//			}
//		}

		return entities;
	}

	public String getHiveDatabaseName(String qualifiedName) {

		if (StringUtils.isNotEmpty(qualifiedName)) {
			String[] split = qualifiedName.split("@");
			if (split.length > 0) {
				return split[0];
			}
		}
		return null;
	}

	public String getHiveTableName(String qualifiedName, boolean isTemporary) {
		return null;
	}

	private void deleteByGuid(List<String> guidTodelete) throws AtlasServiceException {

		if (CollectionUtils.isNotEmpty(guidTodelete)) {

			for (String guid : guidTodelete) {
				EntityMutationResponse response = atlasClientV2.deleteEntityByGuid(guid);

				if (response.getDeletedEntities().size() < 1) {
					LOG.info("Entity with guid : {} is not deleted", guid);
				} else {
					LOG.info("Entity with guid : {} is deleted", guid);
				}
			}
		} else {
			LOG.info("No Entity to delete from Atlas");
		}
	}

	public void deleteEntitiesForNonExistingHiveMetadata(boolean failOnError) throws Exception {

	}

	/**
	 * Gets the atlas entity for the database
	 *
	 * @param databaseName      database Name
	// * @param metadataNamespace cluster name
	 * @return AtlasEntity for database if exists, else null
	 * @throws Exception
	 */
	private AtlasEntityWithExtInfo findDatabase(String databaseName) throws Exception {
		if (LOG.isDebugEnabled()) {
			LOG.debug("Searching Atlas for database {}", databaseName);
		}

		String typeName = DATABASE_TYPE;

		return findEntity(typeName, databaseName+CLUSTER_SUFFIX, true, true);
	}

	private AtlasEntityWithExtInfo findTable(String tableName) throws Exception {
		if (LOG.isDebugEnabled()) {
			LOG.debug("Searching Atlas for table {}", tableName);
		}

		String typeName = TABLE_TYPE;

		return findEntity(typeName, tableName+CLUSTER_SUFFIX, true, true);
	}

	private AtlasEntityWithExtInfo findEntity(final String typeName, final String qualifiedName, boolean minExtInfo,
											  boolean ignoreRelationship) throws AtlasServiceException {
		AtlasEntityWithExtInfo ret = null;

		AtlasSearchResult  result = AtlasUtil.searchByQualifiedName(atlasClientV2, typeName,qualifiedName);
		if(result.getEntities()!=null && !result.getEntities().isEmpty()) {
			ret = atlasClientV2.getEntityByAttribute(typeName,
					Collections.singletonMap(REFERENCEABLE_ATTRIBUTE_NAME, qualifiedName), minExtInfo, ignoreRelationship);
		}

		return ret;
	}
    
	/**
	 * Construct the qualified name used to uniquely identify a Database instance in
	 * Atlas.
	 *
	 * @param metadataNamespace Name of the cluster to which the Hive component
	 *                          belongs
	 * @param dbName            Name of the Hive database
	 * @return Unique qualified name to identify the Database instance in Atlas.
	 */
	public static String getDBQualifiedName(String metadataNamespace, String dbName) {
		return String.format("%s@%s", dbName.toLowerCase(), metadataNamespace);
	}
}
